activiti和设计模式(7)
一个具体的流程启动以后,具体流程示意图:
追踪流程的运转的情况,找到好玩的代码。
根据状态机的跳转的动作:
PROCESS_START = new AtomicOperationProcessStart
||
PROCESS_START_INITIAL = new AtomicOperationProcessStartInitial();
||
ACTIVITY_EXECUTE = new AtomicOperationActivityExecute();
我们看到第一个的开始节点的执行:
public class NoneStartEventActivityBehavior extends FlowNodeActivityBehavior {
// Nothing to see here.
// The default behaviour of the BpmnActivity is exactly what
// a none start event should be doing.
}
实现的在FlowNodeActivityBehavior中
protected BpmnActivityBehavior bpmnActivityBehavior = new BpmnActivityBehavior();
/**
* Default behaviour: just leave the activity with no extra functionality.
*/
public void execute(ActivityExecution execution) throws Exception {
leave(execution);
}
/**
* Default way of leaving a BPMN 2.0 activity: evaluate the conditions on the
* outgoing sequence flow and take those that evaluate to true.
*/
protected void leave(ActivityExecution execution) {
bpmnActivityBehavior.performDefaultOutgoingBehavior(execution);
}
protected void leaveIgnoreConditions(ActivityExecution activityContext) {
bpmnActivityBehavior.performIgnoreConditionsOutgoingBehavior(activityContext);
}
默认的bpmnActivityBehavior沿着默认的线达到下一个节点,是吗?
protected void performOutgoingBehavior(ActivityExecution execution,
boolean checkConditions, boolean throwExceptionIfExecutionStuck, List<ActivityExecution> reusableExecutions) {
if (log.isDebugEnabled()) {
log.debug("Leaving activity '{}'", execution.getActivity().getId());
}
String defaultSequenceFlow = (String) execution.getActivity().getProperty("default");//说明①
List<PvmTransition> transitionsToTake = new ArrayList<PvmTransition>();
List<PvmTransition> outgoingTransitions = execution.getActivity().getOutgoingTransitions();
for (PvmTransition outgoingTransition : outgoingTransitions) {
Expression skipExpression = outgoingTransition.getSkipExpression();
//如果不是蹦跳的skipExpression
if (!SkipExpressionUtil.isSkipExpressionEnabled(execution, skipExpression)) {
// 没有默认线或者出去的线不是默认线
if (defaultSequenceFlow == null || !outgoingTransition.getId().equals(defaultSequenceFlow)) {
Condition condition = (Condition) outgoingTransition.getProperty(BpmnParse.PROPERTYNAME_CONDITION);
//线上没有条件或者,限定了不检查线上条件,或者线上条件符合要求
if (condition == null || !checkConditions || condition.evaluate(outgoingTransition.getId(), execution)) {
transitionsToTake.add(outgoingTransition);
}
}
} else if (SkipExpressionUtil.shouldSkipFlowElement(execution, skipExpression)){
transitionsToTake.add(outgoingTransition);
}
}
//说明②
if (transitionsToTake.size() == 1) {
//只有一条线的时候,直接的走take逻辑
execution.take(transitionsToTake.get(0));
} else if (transitionsToTake.size() >= 1) {
//有多条线的时候
execution.inactivate();
if (reusableExecutions == null || reusableExecutions.isEmpty()) {
execution.takeAll(transitionsToTake, Arrays.asList(execution));
} else {
execution.takeAll(transitionsToTake, reusableExecutions);
}
} else {
//没有满足条件出去的线,如果有默认的线
if (defaultSequenceFlow != null) {
PvmTransition defaultTransition = execution.getActivity().findOutgoingTransition(defaultSequenceFlow);
if (defaultTransition != null) {
execution.take(defaultTransition);
} else {
throw new ActivitiException("Default sequence flow '" + defaultSequenceFlow + "' could not be not found");
}
} else {
//如果有补偿事件
Object isForCompensation = execution.getActivity().getProperty(BpmnParse.PROPERTYNAME_IS_FOR_COMPENSATION);
if(isForCompensation != null && (Boolean) isForCompensation) {
if (execution instanceof ExecutionEntity) {
Context.getCommandContext().getHistoryManager().recordActivityEnd((ExecutionEntity) execution);
}
InterpretableExecution parentExecution = (InterpretableExecution) execution.getParent();
((InterpretableExecution)execution).remove();
parentExecution.signal("compensationDone", null);
} else {
//暂停了,抛出错误
if (log.isDebugEnabled()) {
log.debug("No outgoing sequence flow found for {}. Ending execution.", execution.getActivity().getId());
}
execution.end();
if (throwExceptionIfExecutionStuck) {
throw new ActivitiException("No outgoing sequence flow of the inclusive gateway '" + execution.getActivity().getId()
+ "' could be selected for continuing the process");
}
}
}
}
}
流程定义节点之间的转移,是先找到满足条件的离开的线:
public void take(PvmTransition transition) {
take(transition, true);
}
public void take(PvmTransition transition, boolean fireActivityCompletionEvent) {
if (fireActivityCompletionEvent) {
fireActivityCompletedEvent();
}
if (this.transition!=null) {
throw new PvmException("already taking a transition");
}
if (transition==null) {
throw new PvmException("transition is null");
}
setActivity((ActivityImpl)transition.getSource());
setTransition((TransitionImpl) transition);
performOperation(AtomicOperation.TRANSITION_NOTIFY_LISTENER_END);
}
说的转移,只是同一个ExecutionEntity里面的 Activity修改了,Transition修改了,然后开始执行:TRANSITION_NOTIFY_LISTENER_END. 但是和我们想象的不一样,这里设置的Activiti是Transition的Source,也就是“刚刚执行结束”的活动定义:
PROCESS_START = new AtomicOperationProcessStart
||
PROCESS_START_INITIAL = new AtomicOperationProcessStartInitial();
||
ACTIVITY_EXECUTE = new AtomicOperationActivityExecute();
||
TRANSITION_NOTIFY_LISTENER_END = new AtomicOperationTransitionNotifyListenerEnd();
执行的过程:
public void execute(InterpretableExecution execution) {
ScopeImpl scope = getScope(execution);
List<ExecutionListener> exectionListeners = scope.getExecutionListeners(getEventName());
int executionListenerIndex = execution.getExecutionListenerIndex();
if (exectionListeners.size()>executionListenerIndex) {
execution.setEventName(getEventName());
execution.setEventSource(scope);
ExecutionListener listener = exectionListeners.get(executionListenerIndex);
try {
listener.notify(execution);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new PvmException("couldn't execute event listener : "+e.getMessage(), e);
}
execution.setExecutionListenerIndex(executionListenerIndex+1);
execution.performOperation(this);//说明①
} else {
execution.setExecutionListenerIndex(0);
execution.setEventName(null);
execution.setEventSource(null);
eventNotificationsCompleted(execution);
}
}
@Override
protected void eventNotificationsCompleted(InterpretableExecution execution) {
execution.performOperation(TRANSITION_DESTROY_SCOPE);
}
说明①: 这个是一个循环调用的,为了循环的发送监听事件 现在运行到了:TRANSITION_DESTROY_SCOPE
||
TRANSITION_DESTROY_SCOPE = new AtomicOperationTransitionDestroyScope();
public class AtomicOperationTransitionDestroyScope implements AtomicOperation
说明:TRANSITION_DESTROY_SCOPE直接实现了接口,没有像TRANSITION_NOTIFY_LISTENER_END一样,在抽象类的基础方面实现。这可能也是为什么有了接口,还有抽象类的原因?
public void execute(InterpretableExecution execution) {
InterpretableExecution propagatingExecution = null;
ActivityImpl activity = (ActivityImpl) execution.getActivity();
// if this transition is crossing a scope boundary
if (activity.isScope()) {//当前的任务是否是一个包含的活动节点
InterpretableExecution parentScopeInstance = null;
// if this is a concurrent execution crossing a scope boundary
if (execution.isConcurrent() && !execution.isScope()) {
// first remove the execution from the current root
InterpretableExecution concurrentRoot = (InterpretableExecution) execution.getParent();
parentScopeInstance = (InterpretableExecution) execution.getParent().getParent();
log.debug("moving concurrent {} one scope up under {}", execution, parentScopeInstance);
List<InterpretableExecution> parentScopeInstanceExecutions = (List<InterpretableExecution>) parentScopeInstance.getExecutions();
List<InterpretableExecution> concurrentRootExecutions = (List<InterpretableExecution>) concurrentRoot.getExecutions();
// if the parent scope had only one single scope child
if (parentScopeInstanceExecutions.size()==1) {
// it now becomes a concurrent execution
parentScopeInstanceExecutions.get(0).setConcurrent(true);
}
concurrentRootExecutions.remove(execution);
parentScopeInstanceExecutions.add(execution);
execution.setParent(parentScopeInstance);
execution.setActivity(activity);
propagatingExecution = execution;
// if there is only a single concurrent execution left
// in the concurrent root, auto-prune it. meaning, the
// last concurrent child execution data should be cloned into
// the concurrent root.
if (concurrentRootExecutions.size()==1) {
InterpretableExecution lastConcurrent = concurrentRootExecutions.get(0);
if (lastConcurrent.isScope()) {
lastConcurrent.setConcurrent(false);
} else {
log.debug("merging last concurrent {} into concurrent root {}", lastConcurrent, concurrentRoot);
// We can't just merge the data of the lastConcurrent into the concurrentRoot.
// This is because the concurrent root might be in a takeAll-loop. So the
// concurrent execution is the one that will be receiving the take
concurrentRoot.setActivity((ActivityImpl) lastConcurrent.getActivity());
concurrentRoot.setActive(lastConcurrent.isActive());
lastConcurrent.setReplacedBy(concurrentRoot);
lastConcurrent.remove();
}
}
} else if (execution.isConcurrent() && execution.isScope()) {
log.debug("scoped concurrent {} becomes concurrent and remains under {}", execution, execution.getParent());
// TODO!
execution.destroy();
propagatingExecution = execution;
} else {
propagatingExecution = (InterpretableExecution) execution.getParent();
propagatingExecution.setActivity((ActivityImpl) execution.getActivity());
propagatingExecution.setTransition(execution.getTransition());
propagatingExecution.setActive(true);
log.debug("destroy scope: scoped {} continues as parent scope {}", execution, propagatingExecution);
execution.destroy();
execution.remove();
}
} else {
propagatingExecution = execution;
}
// if there is another scope element that is ended
// 父级别的元素,普通元素的父元素是流程定义ProcessDefinition
ScopeImpl nextOuterScopeElement = activity.getParent();
TransitionImpl transition = propagatingExecution.getTransition();
// 目标节点
ActivityImpl destination = transition.getDestination();
//判断目标节点是否在父节点中,如果不在(考虑到子流程结束后的下一个元素是子流程以外的元素了)
if (transitionLeavesNextOuterScope(nextOuterScopeElement, destination)) {
propagatingExecution.setActivity((ActivityImpl) nextOuterScopeElement);
//再次进入:TRANSITION_NOTIFY_LISTENER_END
propagatingExecution.performOperation(TRANSITION_NOTIFY_LISTENER_END);
} else {
//进入:TRANSITION_NOTIFY_LISTENER_TAKE
propagatingExecution.performOperation(TRANSITION_NOTIFY_LISTENER_TAKE);
}
}
从最后的逻辑,我们可以看到一个循环:
PROCESS_START = new AtomicOperationProcessStart
||
PROCESS_START_INITIAL = new AtomicOperationProcessStartInitial();
||
ACTIVITY_EXECUTE = new AtomicOperationActivityExecute();
||
TRANSITION_NOTIFY_LISTENER_END = new AtomicOperationTransitionNotifyListenerEnd();
||
TRANSITION_DESTROY_SCOPE = new AtomicOperationTransitionDestroyScope();
||
TRANSITION_NOTIFY_LISTENER_TAKE,TRANSITION_NOTIFY_LISTENER_END(第二种情况就是再次的循环一次)
TRANSITION_NOTIFY_LISTENER_TAKE 这个就是对应线的逻辑了:
TRANSITION_NOTIFY_LISTENER_TAKE = new AtomicOperationTransitionNotifyListenerTake();
执行逻辑:
public void execute(InterpretableExecution execution) {
TransitionImpl transition = execution.getTransition();
List<ExecutionListener> executionListeners = transition.getExecutionListeners();
int executionListenerIndex = execution.getExecutionListenerIndex();
if (executionListeners.size()>executionListenerIndex) {
execution.setEventName(org.activiti.engine.impl.pvm.PvmEvent.EVENTNAME_TAKE);
execution.setEventSource(transition);
ExecutionListener listener = executionListeners.get(executionListenerIndex);
try {
listener.notify(execution);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new PvmException("couldn't execute event listener : "+e.getMessage(), e);
}
execution.setExecutionListenerIndex(executionListenerIndex+1);
execution.performOperation(this);
//上面的这部分逻辑和抽象类里面的逻辑基本的一样
} else {
if (log.isDebugEnabled()) {
log.debug("{} takes transition {}", execution, transition);
}
execution.setExecutionListenerIndex(0);
execution.setEventName(null);
execution.setEventSource(null);
ActivityImpl activity = (ActivityImpl) execution.getActivity();
ActivityImpl nextScope = findNextScope(activity.getParent(), transition.getDestination());
execution.setActivity(nextScope);//到这里ExecutionEntity的activity才再次的修改,但是Transition并没有修改
// Firing event that transition is being taken 。。。。。。。。
execution.performOperation(TRANSITION_CREATE_SCOPE);
}
}
然后就到了:TRANSITION_CREATE_SCOPE
TRANSITION_CREATE_SCOPE = new AtomicOperationTransitionCreateScope();
执行逻辑:
public void execute(InterpretableExecution execution) {
InterpretableExecution propagatingExecution = null;
ActivityImpl activity = (ActivityImpl) execution.getActivity();
if (activity.isScope()) {//activity扩展自Scope
// 重新创建了一个Execution,称之为: propagatingExecution
// 该propagatingExecution的Transition仍然是:(startevent1)--flow1-->(usertask1)
//并不是Null
propagatingExecution = (InterpretableExecution) execution.createExecution();
propagatingExecution.setActivity(activity);
propagatingExecution.setTransition(execution.getTransition());
execution.setTransition(null);
execution.setActivity(null);
execution.setActive(false);
// Execution的activity,Transition全部都是null
log.debug("create scope: parent {} continues as execution {}", execution, propagatingExecution);
propagatingExecution.initialize();
} else {
propagatingExecution = execution;
}
propagatingExecution.performOperation(AtomicOperation.TRANSITION_NOTIFY_LISTENER_START);
}
到了:AtomicOperation.TRANSITION_NOTIFY_LISTENER_START
public class AtomicOperationTransitionNotifyListenerStart extends AbstractEventAtomicOperation
扩展自抽象类:AbstractEventAtomicOperation 具体的执行的过程是:
@Override
protected void eventNotificationsCompleted(InterpretableExecution execution) {
TransitionImpl transition = execution.getTransition();
ActivityImpl destination = null;
if(transition == null) { // this is null after async cont. -> transition is not stored in execution
destination = (ActivityImpl) execution.getActivity();
} else {
destination = transition.getDestination();
}
ActivityImpl activity = (ActivityImpl) execution.getActivity();
if (activity!=destination) {
ActivityImpl nextScope = AtomicOperationTransitionNotifyListenerTake.findNextScope(activity, destination);
execution.setActivity(nextScope);
//进入了scope,创建scope
execution.performOperation(TRANSITION_CREATE_SCOPE);
} else {
//在这个地方设置了Transition为null
execution.setTransition(null);
//终于设置了目标activity
execution.setActivity(destination);
execution.performOperation(ACTIVITY_EXECUTE);
}
}
所以总体的循环就是:
PROCESS_START = new AtomicOperationProcessStart
||
PROCESS_START_INITIAL = new AtomicOperationProcessStartInitial();
||
ACTIVITY_EXECUTE(执行) = new AtomicOperationActivityExecute();
||
TRANSITION_NOTIFY_LISTENER_END = new AtomicOperationTransitionNotifyListenerEnd();
||
TRANSITION_DESTROY_SCOPE = new AtomicOperationTransitionDestroyScope();
||
TRANSITION_NOTIFY_LISTENER_TAKE,TRANSITION_NOTIFY_LISTENER_END(第二种情况就是再次的循环一次)
||
TRANSITION_CREATE_SCOPE(创建) = new AtomicOperationTransitionCreateScope();
||
TRANSITION_NOTIFY_LISTENER_START = new AtomicOperationTransitionNotifyListenerStart();
||
TRANSITION_CREATE_SCOPE(再次创建),ACTIVITY_EXECUTE(再次进入)
这个就是节点转移的全过程:
转移的过程主要的逻辑点: 1.拿到目标节点(dedestination),Scope就是包含活动节点之类的元素(子流程)开始和结束的时候的处理。
2.转移过程中监听事件的发送。
3.节点的执行,根据各个节点的Behavior运行。 选择那条线,以及流程定义中不同节点的判断全部的放在了Behavior中了
- 上一篇 activiti和设计模式(6)
- 下一篇 activiti和设计模式(8)